Kafka Topics
What is a Kafka Topic?
A Kafka Topic is a category or feed name to which records (messages) are published. Topics are the primary way data is organized and distributed in Kafka. Think of a topic as a channel on which producers send messages and from which consumers read messages.
Key Characteristics of Kafka Topics
1. Partitioned Structure
- Each topic is divided into one or more partitions
- Partitions are the unit of parallelism in Kafka
- Messages within a partition are ordered
- Order is not guaranteed across partitions
2. Immutable Log
- Each partition is an ordered, immutable sequence of records
- Records are appended to the end of the log
- Records are identified by their offset within the partition
- Once written, records cannot be modified or deleted (within retention period)
3. Replication
- Partitions can be replicated across multiple brokers
- Provides fault tolerance and high availability
- One replica is designated as the leader
- Other replicas are followers
Topic Configuration
Essential Topic Properties
# Create topic with basic configuration
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 3 \
--partitions 6 \
--topic my-topic
Key Configuration Parameters
Parameter | Description | Default | Recommended |
---|---|---|---|
num.partitions | Number of partitions | 1 | Based on throughput needs |
replication.factor | Number of replicas | 1 | 3 for production |
retention.ms | How long to keep messages | 604800000 (7 days) | Based on requirements |
retention.bytes | Max size before deletion | -1 (unlimited) | Based on storage |
cleanup.policy | Cleanup strategy | delete | delete or compact |
compression.type | Compression algorithm | producer | snappy or lz4 |
Topic Management
1. Creating Topics
# Basic topic creation
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 3 \
--replication-factor 2
# Topic with custom configuration
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=86400000 \
--config cleanup.policy=delete \
--config compression.type=snappy
2. Listing Topics
# List all topics
kafka-topics.sh --list \
--bootstrap-server localhost:9092
# List topics with details
kafka-topics.sh --describe \
--bootstrap-server localhost:9092
# Describe specific topic
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic my-topic
3. Modifying Topics
# Increase partitions (can only increase, never decrease)
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 12
# Update topic configuration
kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--add-config retention.ms=172800000
4. Deleting Topics
# Delete topic
kafka-topics.sh --delete \
--bootstrap-server localhost:9092 \
--topic my-topic
Topic Design Patterns
1. Single Topic Pattern
# Simple single topic for all messages
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic user-events \
--partitions 6 \
--replication-factor 3
Use Cases:
- Simple applications
- Single data stream
- Limited message types
2. Multi-Topic Pattern
# Separate topics for different message types
kafka-topics.sh --create --topic user-registrations --partitions 3
kafka-topics.sh --create --topic user-logins --partitions 6
kafka-topics.sh --create --topic user-purchases --partitions 12
kafka-topics.sh --create --topic system-logs --partitions 2
Use Cases:
- Different message types
- Different retention requirements
- Different processing requirements
3. Topic Naming Conventions
# Environment-based naming
kafka-topics.sh --create --topic prod-user-events
kafka-topics.sh --create --topic staging-user-events
kafka-topics.sh --create --topic dev-user-events
# Domain-based naming
kafka-topics.sh --create --topic ecommerce-orders
kafka-topics.sh --create --topic ecommerce-inventory
kafka-topics.sh --create --topic ecommerce-payments
# Event-based naming
kafka-topics.sh --create --topic user.created
kafka-topics.sh --create --topic user.updated
kafka-topics.sh --create --topic user.deleted
Partitioning Strategies
1. Key-Based Partitioning
// Producer with key-based partitioning
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "user-123", "user data");
producer.send(record);
Benefits:
- Messages with same key go to same partition
- Maintains ordering for related messages
- Predictable partitioning
2. Round-Robin Partitioning
// Producer with null key (round-robin)
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", null, "message");
producer.send(record);
Benefits:
- Even distribution across partitions
- Maximum parallelism
- No ordering guarantees
3. Custom Partitioning
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// Custom logic based on business requirements
if (key instanceof String) {
String keyStr = (String) key;
if (keyStr.startsWith("user-")) {
return 0; // User events to partition 0
} else if (keyStr.startsWith("system-")) {
return 1; // System events to partition 1
}
}
// Default hash-based partitioning
return Math.abs(key.hashCode()) % numPartitions;
}
}
Retention and Cleanup Policies
1. Time-Based Retention
# Keep messages for 30 days
kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--add-config retention.ms=2592000000
2. Size-Based Retention
# Keep messages until topic reaches 1GB
kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--add-config retention.bytes=1073741824
3. Log Compaction
# Enable log compaction for key-value data
kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name user-profiles \
--add-config cleanup.policy=compact
Use Cases for Log Compaction:
- User profiles
- Configuration data
- State snapshots
- Change data capture
Topic Monitoring and Management
1. Topic Metrics
# Get topic metrics
kafka-run-class.sh kafka.tools.JmxTool \
--object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec \
--jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi
# Monitor consumer lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --describe
2. Topic Health Checks
public class TopicHealthCheck {
public boolean isTopicHealthy(String topicName) {
try {
// Check if topic exists and is accessible
List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
return partitions != null && !partitions.isEmpty();
} catch (Exception e) {
return false;
}
}
public Map<String, Long> getTopicSizes(String topicName) {
Map<String, Long> partitionSizes = new HashMap<>();
List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
for (PartitionInfo partition : partitions) {
long endOffset = consumer.endOffsets(
Collections.singletonList(
new TopicPartition(topicName, partition.partition())
)
).get(new TopicPartition(topicName, partition.partition()));
partitionSizes.put("partition-" + partition.partition(), endOffset);
}
return partitionSizes;
}
}
3. Topic Administration Tools
# Get topic configuration
kafka-configs.sh --describe \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic
# List consumer groups for a topic
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--list | xargs -I {} kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 --group {} --describe
Topic Security
1. Access Control Lists (ACLs)
# Allow producer access
kafka-acls.sh --add \
--allow-principal User:producer \
--producer \
--topic my-topic \
--bootstrap-server localhost:9092
# Allow consumer access
kafka-acls.sh --add \
--allow-principal User:consumer \
--consumer \
--topic my-topic \
--group my-consumer-group \
--bootstrap-server localhost:9092
2. Topic Encryption
# Enable encryption for a topic
kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name sensitive-topic \
--add-config confluent.value.schema.registry.secret=secret
Best Practices for Topic Design
1. Partitioning Strategy
# Calculate optimal partitions
# Partitions = max(consumers, producers) * throughput_per_partition
# Example: 6 consumers, 1000 msg/sec per partition
kafka-topics.sh --create \
--topic high-throughput-topic \
--partitions 12 \
--replication-factor 3
2. Replication Factor
# Production: 3 replicas for fault tolerance
kafka-topics.sh --create \
--topic production-topic \
--partitions 6 \
--replication-factor 3
# Development: 1 replica for resource efficiency
kafka-topics.sh --create \
--topic dev-topic \
--partitions 3 \
--replication-factor 1
3. Retention Configuration
# High-volume, short-term data
kafka-configs.sh --alter \
--entity-type topics \
--entity-name high-volume-topic \
--add-config retention.ms=86400000 # 1 day
# Low-volume, long-term data
kafka-configs.sh --alter \
--entity-type topics \
--entity-name audit-logs \
--add-config retention.ms=2592000000 # 30 days
4. Topic Naming Best Practices
# Use descriptive, consistent names
kafka-topics.sh --create --topic ecommerce-orders-v1
kafka-topics.sh --create --topic ecommerce-orders-v2
kafka-topics.sh --create --topic ecommerce-orders-dlq
# Include environment prefix
kafka-topics.sh --create --topic prod-user-events
kafka-topics.sh --create --topic staging-user-events
Common Topic Patterns
1. Event Sourcing Topics
# Event sourcing pattern
kafka-topics.sh --create --topic user-events --cleanup.policy=compact
kafka-topics.sh --create --topic order-events --cleanup.policy=compact
kafka-topics.sh --create --topic inventory-events --cleanup.policy=compact
2. CQRS Topics
# Command topics
kafka-topics.sh --create --topic user-commands
kafka-topics.sh --create --topic order-commands
# Query topics
kafka-topics.sh --create --topic user-queries
kafka-topics.sh --create --topic order-queries
3. Dead Letter Queue Topics
# Main topic
kafka-topics.sh --create --topic user-events
# Dead letter queue for failed messages
kafka-topics.sh --create --topic user-events-dlq
Topic Monitoring and Alerting
1. Key Metrics to Monitor
- Messages per second: Topic throughput
- Bytes in/out: Data volume
- Consumer lag: Processing delay
- Partition count: Scalability indicator
- Replication factor: Fault tolerance
2. Alerting Rules
# Example alerting configuration
alerts:
- name: "High Consumer Lag"
condition: "consumer_lag > 10000"
severity: "warning"
- name: "Topic Unavailable"
condition: "topic_status != 'healthy'"
severity: "critical"
- name: "High Error Rate"
condition: "error_rate > 0.01"
severity: "warning"
Best Practices Summary
- Choose appropriate partition count based on throughput and parallelism needs
- Use replication factor 3 for production environments
- Implement proper retention policies based on data requirements
- Use descriptive topic names with consistent naming conventions
- Monitor topic metrics for performance and health
- Implement proper security with ACLs and encryption
- Use log compaction for key-value data that needs latest state
- Plan for topic evolution with versioning strategies
- Test topic configurations before production deployment
- Document topic purposes and configurations for team knowledge
Kafka Topics are the foundation of data organization in Kafka. Proper topic design, configuration, and management are essential for building scalable and reliable data streaming applications.